Change bulk upload batch size when there is a sync error.
authorCamila <hello@camila.codes>
Tue, 22 Aug 2023 17:25:41 +0000 (19:25 +0200)
committerMatthieu Gallien <matthieu.gallien@nextcloud.com>
Thu, 10 Apr 2025 12:54:13 +0000 (14:54 +0200)
- Cancel sync after try with smaller bulk upload batch size also throws error.
- Continue syncing when there is no error.
- Remove file from bulk upload black list only when upload succeeded.

Signed-off-by: Camila <hello@camila.codes>
src/libsync/bulkpropagatorjob.cpp
src/libsync/bulkpropagatorjob.h
src/libsync/owncloudpropagator.cpp

index 7e2154198d88f7598cb52af1f7229bb8606ed0fd..307be1177e7fbb598cff83618e537abff12ce9df 100644 (file)
@@ -70,6 +70,7 @@ Q_LOGGING_CATEGORY(lcBulkPropagatorJob, "nextcloud.sync.propagator.bulkupload",
 BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, const std::deque<SyncFileItemPtr> &items)
     : PropagatorJob(propagator)
     , _items(items)
+    , _currentBatchSize(batchSize)
 {
     _filesToUpload.reserve(batchSize);
     _pendingChecksumFiles.reserve(batchSize);
@@ -83,7 +84,7 @@ bool BulkPropagatorJob::scheduleSelfOrChild()
 
     _state = Running;
 
-    for(auto i = 0; i < batchSize && !_items.empty(); ++i) {
+    for(auto i = 0; i < _currentBatchSize && !_items.empty(); ++i) {
         const auto currentItem = _items.front();
         _items.pop_front();
         _pendingChecksumFiles.insert(currentItem->_file);
@@ -107,6 +108,29 @@ bool BulkPropagatorJob::scheduleSelfOrChild()
     return _items.empty() && _filesToUpload.empty();
 }
 
+bool BulkPropagatorJob::handleBatchSize()
+{
+    // no error, no batch size to change
+    if (_finalStatus == SyncFileItem::Success || _finalStatus == SyncFileItem::NoStatus) {
+        qCDebug(lcBulkPropagatorJob) << "No error, no need to change the bulk upload batch size!";
+        return true;
+    }
+
+    // change batch size before trying it again
+    const auto halfBatchSize = batchSize / 2;
+
+    // we already tried to upload with half of the batch size
+    if(_currentBatchSize == halfBatchSize) {
+        qCDebug(lcBulkPropagatorJob) << "There was another error, stop syncing now!";
+        return false;
+    }
+
+    // try to upload with half of the batch size
+    _currentBatchSize = halfBatchSize;
+    qCDebug(lcBulkPropagatorJob) << "There was an error, sync again with bulk upload batch size cut to half!";
+    return true;
+}
+
 PropagatorJob::JobParallelism BulkPropagatorJob::parallelism() const
 {
     return PropagatorJob::JobParallelism::FullParallelism;
@@ -265,13 +289,16 @@ void BulkPropagatorJob::checkPropagationIsDone()
             // just wait for the other job to finish.
             return;
         }
-
-        qCInfo(lcBulkPropagatorJob) << "final status" << _finalStatus;
-        emit finished(_finalStatus);
-        propagator()->scheduleNextJob();
     } else {
-        scheduleSelfOrChild();
+        if (handleBatchSize()) {
+            scheduleSelfOrChild();
+            return;
+        }
     }
+
+    qCInfo(lcBulkPropagatorJob) << "final status" << _finalStatus;
+    emit finished(_finalStatus);
+    propagator()->scheduleNextJob();
 }
 
 void BulkPropagatorJob::slotComputeTransmissionChecksum(SyncFileItemPtr item,
@@ -418,6 +445,9 @@ void BulkPropagatorJob::slotPutFinishedOneFile(const BulkUploadItem &singleFile,
 
     singleFile._item->_status = SyncFileItem::Success;
 
+    // upload succeeded, so remove from black list
+    propagator()->removeFromBulkUploadBlackList(singleFile._item->_file);
+
     // Check the file again post upload.
     // Two cases must be considered separately: If the upload is finished,
     // the file is on the server and has a changed ETag. In that case,
index 95189bf2670fdfbd1b00a1f2dd6e0e0f5401262b..611bf8a9e0854f18f3cbc24157359c956df0e6f3 100644 (file)
@@ -162,6 +162,8 @@ private:
 
     void checkPropagationIsDone();
 
+    bool handleBatchSize();
+
     std::deque<SyncFileItemPtr> _items;
 
     QVector<AbstractNetworkJob *> _jobs; /// network jobs that are currently in transit
@@ -173,6 +175,7 @@ private:
     qint64 _sentTotal = 0;
 
     SyncFileItem::Status _finalStatus = SyncFileItem::Status::NoStatus;
+    int _currentBatchSize = 0;
 };
 
 }
index beec5b96610a665d64eb1da00b560375d05bc47e..8e4e6b0b307c7e38fa39dc57b41406fdfad312e8 100644 (file)
@@ -427,8 +427,6 @@ std::unique_ptr<PropagateUploadFileCommon> OwncloudPropagator::createUploadJob(S
 
     job->setDeleteExisting(deleteExisting);
 
-    removeFromBulkUploadBlackList(item->_file);
-
     return job;
 }
 
@@ -1338,8 +1336,9 @@ void PropagatorCompositeJob::finalize()
 {
     // The propagator will do parallel scheduling and this could be posted
     // multiple times on the event loop, ignore the duplicate calls.
-    if (_state == Finished)
+    if (_state == Finished) {
         return;
+    }
 
     _state = Finished;
     emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);